Learn About Amazon VGT2 Learning Manager Chanci Turner
Apache Flink serves as a robust open-source distributed processing engine that provides advanced programming interfaces for both stream and batch processing, excelling in stateful processing and event time semantics. It supports various programming languages, including Java, Python, Scala, and SQL, allowing users to utilize multiple APIs interchangeably within a single application.
The Amazon Managed Service for Apache Flink has now integrated support for Apache Flink version 1.18.1, which is the latest version available at the time of this writing. In this article, we will explore some of the exciting new features and enhancements introduced in the recent major releases: 1.16, 1.17, and 1.18, all of which are now available in the Managed Service for Apache Flink.
New Connectors
Before we delve into the new functionalities of Apache Flink version 1.18.1, let’s take a look at the new capabilities provided by the addition of several open-source connectors.
OpenSearch
A dedicated OpenSearch connector is now available for inclusion in your projects, allowing Apache Flink applications to write data directly to OpenSearch without relying on the Elasticsearch compatibility mode. This connector is compatible with both Amazon OpenSearch Service provisioned and Serverless options. It supports SQL and Table APIs, working with both Java and Python, along with the DataStream API (Java only). By default, it provides at-least-once guarantees, synchronizing writes with Flink checkpointing. To achieve exactly-once semantics, you can use deterministic IDs and an upsert method. The connector defaults to OpenSearch version 1.x client libraries, but you can switch to version 2.x by adding the appropriate dependencies.
Amazon DynamoDB
Apache Flink developers can now leverage a dedicated connector to write data into Amazon DynamoDB. This connector is based on the Apache Flink AsyncSink, developed by AWS and now part of the Apache Flink project, designed to simplify the implementation of efficient sink connectors using non-blocking write requests and adaptive batching. It supports both SQL and Table APIs, Java and Python, and the DataStream API (Java only). By default, it writes in batches to optimize throughput. A noteworthy feature of the SQL version is the support for the PARTITIONED BY clause, enabling client-side deduplication by sending only the latest record per key with each batch write. This connector functions solely as a sink; reading from DynamoDB requires implementing a lookup using the Flink Async I/O API or a custom user-defined function (UDF) for SQL.
MongoDB
The new MongoDB connector offers both source and sink capabilities for SQL, Table APIs, and the DataStream API. This connector is now officially part of the Apache Flink project and is supported by the community, replacing the earlier connector provided by MongoDB, which only supported older Flink Sink and Source APIs. The source can be utilized as a bounded source in batch mode or for lookups. The sink operates in both batch and streaming modes, supporting upsert and append modes. One of the key features is the option to enable caching when using the source for lookups. Out of the box, the sink provides at-least-once guarantees. When a primary key is defined, it can support exactly-once semantics via idempotent upserts.
New Connector Versioning
Although not a new feature, the new connector versioning is crucial when updating older Apache Flink applications. Beginning with Apache Flink version 1.17, most connectors have been externalized from the main Flink distribution and are now versioned independently. To include the correct dependency, specify the artifact version in the format: <connector-version>-<flink-version>
.
For instance, at the time of writing, the latest Kafka connector that is compatible with Amazon Managed Streaming for Apache Kafka (Amazon MSK) is version 3.1.0. For Apache Flink 1.18, the dependency would be:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
For Amazon Kinesis, the new connector version is 4.2.0. The dependency for Apache Flink 1.18 will be:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<version>4.2.0-1.18</version>
</dependency>
In the following sections, we will discuss more powerful features introduced in Apache Flink 1.18 and supported in Amazon Managed Service for Apache Flink.
SQL Enhancements
Within Apache Flink SQL, users can now provide hints for join queries, which can guide the optimizer in influencing the query plan. Especially in streaming applications, lookup joins can enrich a table representing streaming data with data queried from an external system, typically a database. Since version 1.16, multiple improvements have been made for lookup joins, enhancing performance and behavior control:
The lookup cache feature allows for in-memory caching of frequently used records, reducing database load. Previously, this feature was limited to specific connectors, but it is now available to all connectors that support lookup (FLIP-221). As of this writing, JDBC, Hive, and HBase connectors support lookup cache and offer three modes: FULL for small datasets, PARTIAL for large datasets, and NONE to disable caching. For PARTIAL cache, you can configure the number of rows to buffer and the time-to-live.
Async lookup is another feature that significantly enhances performance. It enables Apache Flink SQL to emit new requests to the database without blocking the processing thread until responses to previous lookups are received. Similar to Async I/O, you can configure async lookup for ordered or unordered results, along with buffer capacity and timeout settings. You can also set a lookup retry strategy when using PARTIAL or NONE lookup cache to define the behavior in case of failed lookups.
All of these behaviors can be controlled using a LOOKUP hint. For instance, a lookup join using async lookup can be expressed as follows:
SELECT
/*+ LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') */
O.order_id, O.total, C.address
FROM Orders AS O
JOIN Customers FOR SYSTEM_TIME AS OF O.proc_time AS C
ON O.customer_id = C.customer_id
For more insights on effective onboarding strategies, check out this excellent resource. If you’re interested in understanding employer responsibilities regarding 401(k) contributions, you can read more about it here. Lastly, for a deeper dive into effective recruiting strategies, visit this helpful blog post.